{
 "cells": [
  {
   "cell_type": "markdown",
   "id": "bdcd9d53",
   "metadata": {},
   "source": [
    "```\n",
    "Licensed to the Apache Software Foundation (ASF) under one\n",
    "or more contributor license agreements.  See the NOTICE file\n",
    "distributed with this work for additional information\n",
    "regarding copyright ownership.  The ASF licenses this file\n",
    "to you under the Apache License, Version 2.0 (the\n",
    "\"License\"); you may not use this file except in compliance\n",
    "with the License.  You may obtain a copy of the License at\n",
    "  http://www.apache.org/licenses/LICENSE-2.0\n",
    "Unless required by applicable law or agreed to in writing,\n",
    "software distributed under the License is distributed on an\n",
    "\"AS IS\" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY\n",
    "KIND, either express or implied.  See the License for the\n",
    "specific language governing permissions and limitations\n",
    "under the License.\n",
    "```"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 1,
   "id": "00514c5b",
   "metadata": {},
   "outputs": [],
   "source": [
    "# pip install sklearn\n",
    "# pip install pyarrow\n",
    "# pip install fsspec"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 2,
   "id": "92e81634",
   "metadata": {},
   "outputs": [],
   "source": [
    "from IPython.display import display, HTML\n",
    "from pyspark.sql import SparkSession\n",
    "from pyspark import StorageLevel\n",
    "import pandas as pd\n",
    "from pyspark.sql.types import (\n",
    "    StructType,\n",
    "    StructField,\n",
    "    StringType,\n",
    "    LongType,\n",
    "    IntegerType,\n",
    "    DoubleType,\n",
    "    ArrayType,\n",
    ")\n",
    "from pyspark.sql.functions import regexp_replace\n",
    "from sedona.spark import SedonaRegistrator\n",
    "from sedona.spark import SedonaKryoRegistrator, KryoSerializer\n",
    "from pyspark.sql.functions import col, split, expr\n",
    "from pyspark.sql.functions import udf, lit\n",
    "from sedona.spark import SedonaKryoRegistrator, KryoSerializer\n",
    "from pyspark.sql.functions import col, split, expr\n",
    "from pyspark.sql.functions import udf, lit, flatten\n",
    "from pywebhdfs.webhdfs import PyWebHdfsClient\n",
    "from datetime import date"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 3,
   "id": "6f504f33",
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "True"
      ]
     },
     "execution_count": 3,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "analise_folder = \"analise_teste_\" + str(date.today())\n",
    "hdfs = PyWebHdfsClient(host=\"179.106.229.159\", port=\"50070\", user_name=\"root\")\n",
    "hdfs.delete_file_dir(analise_folder, recursive=True)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 4,
   "id": "d4d8efa5",
   "metadata": {},
   "outputs": [
    {
     "name": "stderr",
     "output_type": "stream",
     "text": [
      "Warning: Ignoring non-Spark config property:  spark.sql.adaptive.coalescePartitions.enabled\n",
      "Ivy Default Cache set to: /root/.ivy2/cache\n",
      "The jars for the packages stored in: /root/.ivy2/jars\n",
      ":: loading settings :: url = jar:file:/usr/local/lib/python3.9/dist-packages/pyspark/jars/ivy-2.4.0.jar!/org/apache/ivy/core/settings/ivysettings.xml\n",
      "org.apache.sedona#sedona-python-adapter-3.0_2.12 added as a dependency\n",
      "org.datasyslab#geotools-wrapper added as a dependency\n",
      ":: resolving dependencies :: org.apache.spark#spark-submit-parent-475a8539-e626-41a7-8cca-cb3c72ad1694;1.0\n",
      "\tconfs: [default]\n",
      "\tfound org.apache.sedona#sedona-python-adapter-3.0_2.12;1.1.0-incubating in central\n",
      "\tfound org.locationtech.jts#jts-core;1.18.0 in central\n",
      "\tfound org.wololo#jts2geojson;0.16.1 in central\n",
      "\tfound com.fasterxml.jackson.core#jackson-databind;2.12.2 in central\n",
      "\tfound com.fasterxml.jackson.core#jackson-annotations;2.12.2 in central\n",
      "\tfound com.fasterxml.jackson.core#jackson-core;2.12.2 in central\n",
      "\tfound org.apache.sedona#sedona-core-3.0_2.12;1.1.0-incubating in central\n",
      "\tfound org.apache.sedona#sedona-sql-3.0_2.12;1.1.0-incubating in central\n",
      "\tfound org.datasyslab#geotools-wrapper;1.1.0-25.2 in central\n",
      ":: resolution report :: resolve 503ms :: artifacts dl 8ms\n",
      "\t:: modules in use:\n",
      "\tcom.fasterxml.jackson.core#jackson-annotations;2.12.2 from central in [default]\n",
      "\tcom.fasterxml.jackson.core#jackson-core;2.12.2 from central in [default]\n",
      "\tcom.fasterxml.jackson.core#jackson-databind;2.12.2 from central in [default]\n",
      "\torg.apache.sedona#sedona-core-3.0_2.12;1.1.0-incubating from central in [default]\n",
      "\torg.apache.sedona#sedona-python-adapter-3.0_2.12;1.1.0-incubating from central in [default]\n",
      "\torg.apache.sedona#sedona-sql-3.0_2.12;1.1.0-incubating from central in [default]\n",
      "\torg.datasyslab#geotools-wrapper;1.1.0-25.2 from central in [default]\n",
      "\torg.locationtech.jts#jts-core;1.18.0 from central in [default]\n",
      "\torg.wololo#jts2geojson;0.16.1 from central in [default]\n",
      "\t:: evicted modules:\n",
      "\torg.locationtech.jts#jts-core;1.18.1 by [org.locationtech.jts#jts-core;1.18.0] in [default]\n",
      "\t---------------------------------------------------------------------\n",
      "\t|                  |            modules            ||   artifacts   |\n",
      "\t|       conf       | number| search|dwnlded|evicted|| number|dwnlded|\n",
      "\t---------------------------------------------------------------------\n",
      "\t|      default     |   10  |   0   |   0   |   1   ||   9   |   0   |\n",
      "\t---------------------------------------------------------------------\n",
      ":: retrieving :: org.apache.spark#spark-submit-parent-475a8539-e626-41a7-8cca-cb3c72ad1694\n",
      "\tconfs: [default]\n",
      "\t0 artifacts copied, 9 already retrieved (0kB/5ms)\n",
      "22/01/06 20:04:42 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable\n",
      "Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties\n",
      "Setting default log level to \"WARN\".\n",
      "To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).\n",
      "22/01/06 20:04:45 WARN SQLConf: The SQL config 'spark.sql.execution.arrow.fallback.enabled' has been deprecated in Spark v3.0 and may be removed in the future. Use 'spark.sql.execution.arrow.pyspark.fallback.enabled' instead of it.\n",
      "22/01/06 20:04:45 WARN SQLConf: The SQL config 'spark.sql.execution.arrow.fallback.enabled' has been deprecated in Spark v3.0 and may be removed in the future. Use 'spark.sql.execution.arrow.pyspark.fallback.enabled' instead of it.\n",
      "                                                                                \r"
     ]
    }
   ],
   "source": [
    "# spark.scheduler.mode', 'FAIR'\n",
    "spark = (\n",
    "    SparkSession.builder.appName(\"Sentinel-app\")\n",
    "    .enableHiveSupport()\n",
    "    .master(\"local[*]\")\n",
    "    .master(\"spark://spark-master:7077\")\n",
    "    .config(\"spark.executor.memory\", \"15G\")\n",
    "    .config(\"spark.driver.maxResultSize\", \"135G\")\n",
    "    .config(\"spark.sql.shuffle.partitions\", \"500\")\n",
    "    .config(\" spark.sql.adaptive.coalescePartitions.enabled\", True)\n",
    "    .config(\"spark.sql.adaptive.enabled\", True)\n",
    "    .config(\"spark.sql.adaptive.coalescePartitions.initialPartitionNum\", 125)\n",
    "    .config(\"spark.sql.execution.arrow.pyspark.enabled\", True)\n",
    "    .config(\"spark.sql.execution.arrow.fallback.enabled\", True)\n",
    "    .config(\"spark.kryoserializer.buffer.max\", 2047)\n",
    "    .config(\"spark.serializer\", KryoSerializer.getName)\n",
    "    .config(\"spark.kryo.registrator\", SedonaKryoRegistrator.getName)\n",
    "    .config(\n",
    "        \"spark.jars.packages\",\n",
    "        \"org.apache.sedona:sedona-python-adapter-3.0_2.12:1.1.0-incubating,org.datasyslab:geotools-wrapper:1.1.0-25.2\",\n",
    "    )\n",
    "    .enableHiveSupport()\n",
    "    .getOrCreate()\n",
    ")\n",
    "\n",
    "SedonaRegistrator.registerAll(spark)\n",
    "sc = spark.sparkContext"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 5,
   "id": "03300d21",
   "metadata": {},
   "outputs": [
    {
     "name": "stderr",
     "output_type": "stream",
     "text": [
      "                                                                                \r"
     ]
    }
   ],
   "source": [
    "# Path to directory of geotiff images\n",
    "DATA_DIR = \"hdfs://776faf4d6a1e:8020/sentinel2_tmp/*\"\n",
    "df = spark.read.format(\"geotiff\").option(\"dropInvalid\", True).load(DATA_DIR)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 6,
   "id": "53550d54",
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "From local[5]4\n"
     ]
    }
   ],
   "source": [
    "# SUPER IMPORTANT ULTRA MEGA POWER FOR MEMORY PROBLENS SOLVE\n",
    "rdd = spark.sparkContext.parallelize((0, 20))\n",
    "print(\"From local[5]\" + str(rdd.getNumPartitions()))"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 7,
   "id": "0907df80",
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "root\n",
      " |-- image: struct (nullable = true)\n",
      " |    |-- origin: string (nullable = true)\n",
      " |    |-- wkt: string (nullable = true)\n",
      " |    |-- height: integer (nullable = true)\n",
      " |    |-- width: integer (nullable = true)\n",
      " |    |-- nBands: integer (nullable = true)\n",
      " |    |-- data: array (nullable = true)\n",
      " |    |    |-- element: double (containsNull = true)\n"
     ]
    },
    {
     "name": "stderr",
     "output_type": "stream",
     "text": [
      "22/01/06 20:04:57 WARN SQLConf: The SQL config 'spark.sql.execution.arrow.fallback.enabled' has been deprecated in Spark v3.0 and may be removed in the future. Use 'spark.sql.execution.arrow.pyspark.fallback.enabled' instead of it.\n"
     ]
    }
   ],
   "source": [
    "df.cache()\n",
    "df.printSchema()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 8,
   "id": "80ee3d06",
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "== Physical Plan ==\n",
      "*(1) Project [image#14, monotonically_increasing_id() AS id#22L]\n",
      "+- InMemoryTableScan [image#14]\n",
      "      +- InMemoryRelation [image#14], StorageLevel(disk, memory, deserialized, 1 replicas)\n",
      "            +- FileScan geotiff [image#14] Batched: false, DataFilters: [], Format: org.apache.spark.sql.sedona_sql.io.GeotiffFileFormat@5f9f8a31, Location: InMemoryFileIndex[hdfs://776faf4d6a1e:8020/sentinel2_tmp/1, hdfs://776faf4d6a1e:8020/sentinel2_tm..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<image:struct<origin:string,wkt:string,height:int,width:int,nBands:int,data:array<double>>>\n",
      "\n"
     ]
    },
    {
     "name": "stderr",
     "output_type": "stream",
     "text": [
      "[Stage 4:>                                                          (0 + 1) / 1]\r"
     ]
    },
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+--------------------+---+\n",
      "|               image| id|\n",
      "+--------------------+---+\n",
      "|[hdfs://776faf4d6...|  0|\n",
      "|[hdfs://776faf4d6...|  1|\n",
      "|[hdfs://776faf4d6...|  2|\n",
      "|[hdfs://776faf4d6...|  3|\n",
      "|[hdfs://776faf4d6...|  4|\n",
      "+--------------------+---+\n"
     ]
    },
    {
     "name": "stderr",
     "output_type": "stream",
     "text": [
      "                                                                                \r"
     ]
    }
   ],
   "source": [
    "from pyspark.sql.functions import monotonically_increasing_id\n",
    "\n",
    "# add ID\n",
    "df_index = df.select(\"*\").withColumn(\"id\", monotonically_increasing_id())\n",
    "df_index.explain()\n",
    "df_index.show(5)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 9,
   "id": "39e61e2a",
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "[('id', 'bigint'), ('origin', 'string'), ('height', 'int'), ('width', 'int'), ('data', 'string'), ('bands', 'int')]\n",
      "== Physical Plan ==\n",
      "*(1) Project [id#22L, image#14.origin AS origin#65, image#14.height AS height#66, image#14.width AS width#67, cast(image#14.data as string) AS data#68, image#14.nBands AS bands#69]\n",
      "+- *(1) Project [image#14, monotonically_increasing_id() AS id#22L]\n",
      "   +- InMemoryTableScan [image#14]\n",
      "         +- InMemoryRelation [image#14], StorageLevel(disk, memory, deserialized, 1 replicas)\n",
      "               +- FileScan geotiff [image#14] Batched: false, DataFilters: [], Format: org.apache.spark.sql.sedona_sql.io.GeotiffFileFormat@5f9f8a31, Location: InMemoryFileIndex[hdfs://776faf4d6a1e:8020/sentinel2_tmp/1, hdfs://776faf4d6a1e:8020/sentinel2_tm..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<image:struct<origin:string,wkt:string,height:int,width:int,nBands:int,data:array<double>>>\n",
      "\n"
     ]
    }
   ],
   "source": [
    "# \"image.wkt as Geom\",\n",
    "df_export = df_index.selectExpr(\n",
    "    \"id\",\n",
    "    \"image.origin as origin\",\n",
    "    \"image.height as height\",\n",
    "    \"image.width as width\",\n",
    "    \"cast(image.data as string) as data\",\n",
    "    \"image.nBands as bands\",\n",
    ")\n",
    "print(df_export.dtypes)\n",
    "df_export.explain()\n",
    "df_export.createOrReplaceTempView(\"df_export\")"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 10,
   "id": "089277c3",
   "metadata": {},
   "outputs": [],
   "source": [
    "# df_export.repartition(\"origin\").write.format('csv').option('header', True).partitionBy(\"origin\").mode('overwrite').option('sep', ',').save(\"hdfs://776faf4d6a1e:8020/\"+analise_folder)\n",
    "# df_export.write.format('csv').option('header', True).option('sep', ',').save(\"hdfs://776faf4d6a1e:8020/\"+analise_folder)\n",
    "# start = 0\n",
    "# end = 10\n",
    "# part_df_export =  spark.sql('select * from df_export where id between '+str(start)+' and '+str(end))\n",
    "# part_df_export.show(7)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 11,
   "id": "abba167b",
   "metadata": {},
   "outputs": [],
   "source": [
    "# df_writer = part_df_export.write.format('csv').option('header', True).option('sep', ',')\n",
    "# df_writer.save(\"hdfs://776faf4d6a1e:8020/\"+analise_folder)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 12,
   "id": "0c3ae716",
   "metadata": {},
   "outputs": [],
   "source": [
    "# POR 1 LINHA SER GRANDE O SUFICIENTE PARA ESTOURO DE MEMORIA O COLLECT NÂO FUNCIONA E NEM SALVAR O DF_SPARK DIRETO\n",
    "# (NECESSÀRIO TRANFORMAR PARA PANDAS LINHA A LINHA)\n",
    "# part_df_export.take(3)\n",
    "part_df_export = df_export.take(1)\n",
    "# print(part_df_export)\n",
    "pd.DataFrame(part_df_export).to_csv(\"teste.csv\", sep=\",\", encoding=\"utf-8\")"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 13,
   "id": "5f9a5f60",
   "metadata": {},
   "outputs": [
    {
     "name": "stderr",
     "output_type": "stream",
     "text": [
      "22/01/06 20:05:02 WARN SQLConf: The SQL config 'spark.sql.execution.arrow.fallback.enabled' has been deprecated in Spark v3.0 and may be removed in the future. Use 'spark.sql.execution.arrow.pyspark.fallback.enabled' instead of it.\n"
     ]
    },
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+--------------------+--------------------+------+-----+--------------------+-----+\n",
      "|              origin|                Geom|height|width|                data|bands|\n",
      "+--------------------+--------------------+------+-----+--------------------+-----+\n",
      "|hdfs://776faf4d6a...|POLYGON ((-54.546...|   186|  300|[409.0, 404.0, 41...|    4|\n",
      "|hdfs://776faf4d6a...|POLYGON ((-54.546...|   186|  300|[1838.0, 1778.0, ...|    4|\n",
      "|hdfs://776faf4d6a...|POLYGON ((-54.274...|   199|  257|[931.0, 971.0, 95...|    4|\n",
      "|hdfs://776faf4d6a...|POLYGON ((-54.274...|   199|  257|[957.0, 995.0, 97...|    4|\n",
      "|hdfs://776faf4d6a...|POLYGON ((-54.274...|   199|  257|[428.0, 428.0, 43...|    4|\n",
      "+--------------------+--------------------+------+-----+--------------------+-----+\n",
      "only showing top 5 rows\n",
      "\n",
      "[('origin', 'string'), ('Geom', 'udt'), ('height', 'int'), ('width', 'int'), ('data', 'array<double>'), ('bands', 'int')]\n",
      "== Physical Plan ==\n",
      "InMemoryTableScan [origin#111, Geom#112, height#113, width#114, data#115, bands#116]\n",
      "   +- InMemoryRelation [origin#111, Geom#112, height#113, width#114, data#115, bands#116], StorageLevel(disk, memory, deserialized, 1 replicas)\n",
      "         +- Project [image#14.origin AS origin#111, st_geomfromwkt(image#14.wkt) AS Geom#112, image#14.height AS height#113, image#14.width AS width#114, image#14.data AS data#115, image#14.nBands AS bands#116]\n",
      "            +- InMemoryTableScan [image#14]\n",
      "                  +- InMemoryRelation [image#14], StorageLevel(disk, memory, deserialized, 1 replicas)\n",
      "                        +- FileScan geotiff [image#14] Batched: false, DataFilters: [], Format: org.apache.spark.sql.sedona_sql.io.GeotiffFileFormat@5f9f8a31, Location: InMemoryFileIndex[hdfs://776faf4d6a1e:8020/sentinel2_tmp/1, hdfs://776faf4d6a1e:8020/sentinel2_tm..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<image:struct<origin:string,wkt:string,height:int,width:int,nBands:int,data:array<double>>>\n",
      "\n"
     ]
    }
   ],
   "source": [
    "df = df.selectExpr(\n",
    "    \"image.origin as origin\",\n",
    "    \"ST_GeomFromWkt(image.wkt) as Geom\",\n",
    "    \"image.height as height\",\n",
    "    \"image.width as width\",\n",
    "    \"image.data as data\",\n",
    "    \"image.nBands as bands\",\n",
    ").cache()\n",
    "df.show(5)\n",
    "print(df.dtypes)\n",
    "df.explain()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 14,
   "id": "9044edaa",
   "metadata": {},
   "outputs": [
    {
     "name": "stderr",
     "output_type": "stream",
     "text": [
      "22/01/06 20:05:03 WARN SQLConf: The SQL config 'spark.sql.execution.arrow.fallback.enabled' has been deprecated in Spark v3.0 and may be removed in the future. Use 'spark.sql.execution.arrow.pyspark.fallback.enabled' instead of it.\n",
      "[Stage 7:>                                                          (0 + 1) / 1]\r"
     ]
    },
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+\n",
      "|              origin|                Geom|                  B2|                  B3|                  B4|                  B8|      constant_evi_2|      constant_evi_1|      constant_evi_3|      constant_tgi_1|      constant_tgi_2|           corrector|\n",
      "+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+\n",
      "|hdfs://776faf4d6a...|POLYGON ((-54.546...|[409.0, 404.0, 41...|[713.0, 673.0, 70...|[0.0, 0.0, 0.0, 0...|[0.0, 0.0, 0.0, 0...|[2.4, 2.4, 2.4, 2...|[2.5, 2.5, 2.5, 2...|[1.0, 1.0, 1.0, 1...|[-0.5, -0.5, -0.5...|[120.0, 120.0, 12...|[0.001, 0.001, 0....|\n",
      "|hdfs://776faf4d6a...|POLYGON ((-54.546...|[1838.0, 1778.0, ...|[1074.0, 1026.0, ...|[0.0, 0.0, 0.0, 0...|[0.0, 0.0, 0.0, 0...|[2.4, 2.4, 2.4, 2...|[2.5, 2.5, 2.5, 2...|[1.0, 1.0, 1.0, 1...|[-0.5, -0.5, -0.5...|[120.0, 120.0, 12...|[0.001, 0.001, 0....|\n",
      "|hdfs://776faf4d6a...|POLYGON ((-54.274...|[931.0, 971.0, 95...|[1282.0, 1356.0, ...|[0.0, 0.0, 0.0, 0...|[0.0, 0.0, 0.0, 0...|[2.4, 2.4, 2.4, 2...|[2.5, 2.5, 2.5, 2...|[1.0, 1.0, 1.0, 1...|[-0.5, -0.5, -0.5...|[120.0, 120.0, 12...|[0.001, 0.001, 0....|\n",
      "|hdfs://776faf4d6a...|POLYGON ((-54.274...|[957.0, 995.0, 97...|[1282.0, 1354.0, ...|[0.0, 0.0, 0.0, 0...|[0.0, 0.0, 0.0, 0...|[2.4, 2.4, 2.4, 2...|[2.5, 2.5, 2.5, 2...|[1.0, 1.0, 1.0, 1...|[-0.5, -0.5, -0.5...|[120.0, 120.0, 12...|[0.001, 0.001, 0....|\n",
      "|hdfs://776faf4d6a...|POLYGON ((-54.274...|[428.0, 428.0, 43...|[880.0, 874.0, 79...|[0.0, 0.0, 0.0, 0...|[0.0, 0.0, 0.0, 0...|[2.4, 2.4, 2.4, 2...|[2.5, 2.5, 2.5, 2...|[1.0, 1.0, 1.0, 1...|[-0.5, -0.5, -0.5...|[120.0, 120.0, 12...|[0.001, 0.001, 0....|\n",
      "+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+\n"
     ]
    },
    {
     "name": "stderr",
     "output_type": "stream",
     "text": [
      "                                                                                \r"
     ]
    }
   ],
   "source": [
    "df = df.selectExpr(\n",
    "    \"origin\",\n",
    "    \"Geom\",\n",
    "    \"RS_GetBand(data, 1,bands) as B2\",\n",
    "    \"RS_GetBand(data, 2,bands) as B3\",\n",
    "    \"RS_GetBand(data, 3,bands) as B4\",\n",
    "    \"RS_GetBand(data, 4,bands) as B8\",\n",
    "    \"RS_Array(height * width, 2.4) as constant_evi_2\",\n",
    "    \"RS_Array(height * width, 2.5) as constant_evi_1\",\n",
    "    \"RS_Array(height * width, 1.0) as constant_evi_3\",\n",
    "    \"RS_Array(height * width, -0.5) as constant_tgi_1\",\n",
    "    \"RS_Array(height * width, 120.0) as constant_tgi_2\",\n",
    "    \"RS_Array(height * width, 0.001) as corrector\",\n",
    ").cache()\n",
    "df.createOrReplaceTempView(\"allbands\")\n",
    "df.show(5)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 15,
   "id": "767d580c",
   "metadata": {},
   "outputs": [
    {
     "name": "stderr",
     "output_type": "stream",
     "text": [
      "[Stage 9:>                                                          (0 + 1) / 1]\r"
     ]
    },
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+-------------------+------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+\n",
      "|         image_date|feature_name|              origin|                Geom|                  B2|                  B3|                  B4|                  B8|      constant_evi_2|      constant_evi_1|      constant_evi_3|      constant_tgi_1|      constant_tgi_2|           corrector|\n",
      "+-------------------+------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+\n",
      "|2021-12-26 13:42:12|          70|hdfs://776faf4d6a...|POLYGON ((-54.546...|[409.0, 404.0, 41...|[713.0, 673.0, 70...|[0.0, 0.0, 0.0, 0...|[0.0, 0.0, 0.0, 0...|[2.4, 2.4, 2.4, 2...|[2.5, 2.5, 2.5, 2...|[1.0, 1.0, 1.0, 1...|[-0.5, -0.5, -0.5...|[120.0, 120.0, 12...|[0.001, 0.001, 0....|\n",
      "|2021-12-21 13:42:05|          70|hdfs://776faf4d6a...|POLYGON ((-54.546...|[1838.0, 1778.0, ...|[1074.0, 1026.0, ...|[0.0, 0.0, 0.0, 0...|[0.0, 0.0, 0.0, 0...|[2.4, 2.4, 2.4, 2...|[2.5, 2.5, 2.5, 2...|[1.0, 1.0, 1.0, 1...|[-0.5, -0.5, -0.5...|[120.0, 120.0, 12...|[0.001, 0.001, 0....|\n",
      "|2021-12-26 13:42:12|           3|hdfs://776faf4d6a...|POLYGON ((-54.274...|[931.0, 971.0, 95...|[1282.0, 1356.0, ...|[0.0, 0.0, 0.0, 0...|[0.0, 0.0, 0.0, 0...|[2.4, 2.4, 2.4, 2...|[2.5, 2.5, 2.5, 2...|[1.0, 1.0, 1.0, 1...|[-0.5, -0.5, -0.5...|[120.0, 120.0, 12...|[0.001, 0.001, 0....|\n",
      "|2021-12-26 13:42:12|           3|hdfs://776faf4d6a...|POLYGON ((-54.274...|[957.0, 995.0, 97...|[1282.0, 1354.0, ...|[0.0, 0.0, 0.0, 0...|[0.0, 0.0, 0.0, 0...|[2.4, 2.4, 2.4, 2...|[2.5, 2.5, 2.5, 2...|[1.0, 1.0, 1.0, 1...|[-0.5, -0.5, -0.5...|[120.0, 120.0, 12...|[0.001, 0.001, 0....|\n",
      "|2021-12-16 13:42:11|           3|hdfs://776faf4d6a...|POLYGON ((-54.274...|[428.0, 428.0, 43...|[880.0, 874.0, 79...|[0.0, 0.0, 0.0, 0...|[0.0, 0.0, 0.0, 0...|[2.4, 2.4, 2.4, 2...|[2.5, 2.5, 2.5, 2...|[1.0, 1.0, 1.0, 1...|[-0.5, -0.5, -0.5...|[120.0, 120.0, 12...|[0.001, 0.001, 0....|\n",
      "+-------------------+------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+\n"
     ]
    },
    {
     "name": "stderr",
     "output_type": "stream",
     "text": [
      "                                                                                \r"
     ]
    }
   ],
   "source": [
    "# Não tem data da imagem\n",
    "# Não tem parte a qual ela se refere\n",
    "# Necessário adicionar\n",
    "origin = df.selectExpr(\"origin\")\n",
    "split_origin = origin.select(split(col(\"origin\"), \"/\"))\n",
    "split_origin.head()\n",
    "# 20211226T134212\n",
    "split_origin = spark.sql(\n",
    "    \"select to_timestamp(REPLACE(SPLIT(SPLIT(origin,'/')[5], '_')[1],'T',' '),'yyyyMMdd HHmmss') as image_date, SPLIT(origin,'/')[4] as feature_name, * from allbands\"\n",
    ")\n",
    "split_origin.show(5)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 16,
   "id": "627e4dbe",
   "metadata": {},
   "outputs": [
    {
     "name": "stderr",
     "output_type": "stream",
     "text": [
      "22/01/06 20:05:06 WARN SQLConf: The SQL config 'spark.sql.execution.arrow.fallback.enabled' has been deprecated in Spark v3.0 and may be removed in the future. Use 'spark.sql.execution.arrow.pyspark.fallback.enabled' instead of it.\n",
      "22/01/06 20:05:06 WARN SQLConf: The SQL config 'spark.sql.execution.arrow.fallback.enabled' has been deprecated in Spark v3.0 and may be removed in the future. Use 'spark.sql.execution.arrow.pyspark.fallback.enabled' instead of it.\n",
      "22/01/06 20:05:06 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.\n",
      "22/01/06 20:05:06 WARN SQLConf: The SQL config 'spark.sql.execution.arrow.fallback.enabled' has been deprecated in Spark v3.0 and may be removed in the future. Use 'spark.sql.execution.arrow.pyspark.fallback.enabled' instead of it.\n",
      "22/01/06 20:05:06 WARN SQLConf: The SQL config 'spark.sql.execution.arrow.fallback.enabled' has been deprecated in Spark v3.0 and may be removed in the future. Use 'spark.sql.execution.arrow.pyspark.fallback.enabled' instead of it.\n",
      "22/01/06 20:05:06 WARN SQLConf: The SQL config 'spark.sql.execution.arrow.fallback.enabled' has been deprecated in Spark v3.0 and may be removed in the future. Use 'spark.sql.execution.arrow.pyspark.fallback.enabled' instead of it.\n",
      "22/01/06 20:05:06 WARN SQLConf: The SQL config 'spark.sql.execution.arrow.fallback.enabled' has been deprecated in Spark v3.0 and may be removed in the future. Use 'spark.sql.execution.arrow.pyspark.fallback.enabled' instead of it.\n"
     ]
    }
   ],
   "source": [
    "# Fator de correcao da banda para ficar com valores entre 0 e 1\n",
    "correct_origin = split_origin.selectExpr(\n",
    "    \"RS_MultiplyBands(B2, corrector) as bluen\",\n",
    "    \"RS_MultiplyBands(B3, corrector) as greenn\",\n",
    "    \"RS_MultiplyBands(B4, corrector) as redn\",\n",
    "    \"RS_MultiplyBands(B8, corrector) as nirn\",\n",
    "    \"*\",\n",
    ").cache()\n",
    "correct_origin = correct_origin.selectExpr(\n",
    "    \"RS_NormalizedDifference(nirn, redn) as gndvi\",\n",
    "    \"RS_SubtractBands(nirn, redn) as sub_nirn_redn\",\n",
    "    \"RS_AddBands(nirn,constant_evi_2) as add_nirn_constant_evi_2\",\n",
    "    \"RS_AddBands(redn, constant_evi_3) as add_redn_constant_evi_3\",\n",
    "    \"RS_DivideBands(nirn, greenn) as div_nirn_greenn\",\n",
    "    \"RS_SubtractBands(greenn, redn) as sub_greenn_redn\",\n",
    "    \"RS_SubtractBands(redn, greenn) as sub_redn_greenn\",\n",
    "    \"RS_SubtractBands(redn, bluen) as sub_redn_bluen\",\n",
    "    \"RS_AddBands(greenn, redn) as add_greenn_redn\",\n",
    "    \"*\",\n",
    ").cache()\n",
    "\n",
    "correct_origin = correct_origin.selectExpr(\n",
    "    \"RS_SubtractBands(add_greenn_redn, bluen) as greenn_redn_sub_bluen\",\n",
    "    \"RS_AddBands(add_greenn_redn, bluen) as greenn_redn_add_bluen\",\n",
    "    \"RS_SubtractBands(sub_greenn_redn, bluen) as sub_greenn_redn_bluen\",\n",
    "    \"RS_SubtractBands(sub_redn_greenn, constant_tgi_2) as sub_red_gren_tgi_2\",\n",
    "    \"*\",\n",
    ").cache()\n",
    "correct_origin = correct_origin.selectExpr(\n",
    "    \"RS_MultiplyFactor(sub_redn_bluen,120) as ms_redn_bluen_120\", \"*\"\n",
    ").cache()\n",
    "correct_origin = correct_origin.selectExpr(\n",
    "    \"RS_MultiplyFactor(sub_redn_greenn,190) as ms_redn_greenn_190\", \"*\"\n",
    ").cache()\n",
    "correct_origin = correct_origin.selectExpr(\n",
    "    \"RS_SubtractBands(ms_redn_greenn_190,ms_redn_bluen_120) as sub_msrg_190_msrb_120\",\n",
    "    \"*\",\n",
    ").cache()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 17,
   "id": "33d5ed66",
   "metadata": {},
   "outputs": [
    {
     "name": "stderr",
     "output_type": "stream",
     "text": [
      "22/01/06 20:05:06 WARN SQLConf: The SQL config 'spark.sql.execution.arrow.fallback.enabled' has been deprecated in Spark v3.0 and may be removed in the future. Use 'spark.sql.execution.arrow.pyspark.fallback.enabled' instead of it.\n",
      "[Stage 10:>                                                         (0 + 1) / 1]\r"
     ]
    },
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+-------------------+------------+\n",
      "|               gndvi|                 evi|                 gci|                vari|                 gli|                 tgi|              origin|         image_date|feature_name|\n",
      "+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+-------------------+------------+\n",
      "|[0.0, 0.0, 0.0, 0...|[0.0, 0.0, 0.0, 0...|[1.0, 1.0, 1.0, 1...|[2.35, 2.5, 2.45,...|[1.0, 1.0, 1.0, 1...|[43.1949999999999...|hdfs://776faf4d6a...|2021-12-26 13:42:12|          70|\n",
      "|[0.0, 0.0, 0.0, 0...|[0.0, 0.0, 0.0, 0...|[1.0, 1.0, 1.0, 1...|[-1.41, -1.36, -1...|[1.0, 1.0, 1.0, 1...|[-8.25, -9.210000...|hdfs://776faf4d6a...|2021-12-21 13:42:05|          70|\n",
      "|[0.0, 0.0, 0.0, 0...|[0.0, 0.0, 0.0, 0...|[1.0, 1.0, 1.0, 1...|[3.65, 3.52, 3.9,...|[1.0, 1.0, 1.0, 1...|[65.93, 70.560000...|hdfs://776faf4d6a...|2021-12-26 13:42:12|           3|\n",
      "|[0.0, 0.0, 0.0, 0...|[0.0, 0.0, 0.0, 0...|[1.0, 1.0, 1.0, 1...|[3.94, 3.77, 4.17...|[1.0, 1.0, 1.0, 1...|[64.37, 68.929999...|hdfs://776faf4d6a...|2021-12-26 13:42:12|           3|\n",
      "|[0.0, 0.0, 0.0, 0...|[0.0, 0.0, 0.0, 0...|[1.0, 1.0, 1.0, 1...|[1.95, 1.96, 2.19...|[1.0, 1.0, 1.0, 1...|[57.9199999999999...|hdfs://776faf4d6a...|2021-12-16 13:42:11|           3|\n",
      "+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+-------------------+------------+\n",
      "only showing top 5 rows\n",
      "\n",
      "root\n",
      " |-- gndvi: array (nullable = false)\n",
      " |    |-- element: double (containsNull = true)\n",
      " |-- evi: array (nullable = false)\n",
      " |    |-- element: double (containsNull = true)\n",
      " |-- gci: array (nullable = false)\n",
      " |    |-- element: double (containsNull = true)\n",
      " |-- vari: array (nullable = false)\n",
      " |    |-- element: double (containsNull = true)\n",
      " |-- gli: array (nullable = false)\n",
      " |    |-- element: double (containsNull = true)\n",
      " |-- tgi: array (nullable = false)\n",
      " |    |-- element: double (containsNull = true)\n",
      " |-- origin: string (nullable = true)\n",
      " |-- image_date: timestamp (nullable = true)\n",
      " |-- feature_name: string (nullable = true)\n"
     ]
    },
    {
     "name": "stderr",
     "output_type": "stream",
     "text": [
      "                                                                                \r"
     ]
    }
   ],
   "source": [
    "# bluen = src.read(1, masked=True) / 10000\n",
    "# greenn = src.read(2, masked=True) / 10000\n",
    "# redn = src.read(3, masked=True) / 10000\n",
    "# nirn = src.read(4, masked=True) / 10000\n",
    "# evi = 2.5 * (nirn - redn) / (nirn + 2.4 * redn + 1)\n",
    "# gci = (nirn / greenn) - 1\n",
    "# gli = (2 * greenn - redn - bluen) / (2 * greenn + redn + bluen)\n",
    "# gndvi = (nirn - greenn) / (nirn + greenn)\n",
    "# tgi = (-0.5) * (190 * (redn - greenn) - 120 * (redn - bluen))\n",
    "# vari = (greenn - redn) / (greenn + redn - bluen)\n",
    "\n",
    "\n",
    "calculated = correct_origin.selectExpr(\n",
    "    \"RS_NormalizedDifference(nirn, redn) as gndvi\",\n",
    "    \"RS_DivideBands(RS_MultiplyBands(constant_evi_1, sub_nirn_redn), RS_MultiplyBands(add_nirn_constant_evi_2, add_redn_constant_evi_3)) as evi\",\n",
    "    \"RS_SubtractBands(div_nirn_greenn, constant_evi_3) as gci\",\n",
    "    \"RS_DivideBands(sub_greenn_redn, greenn_redn_sub_bluen) as vari\",\n",
    "    \"RS_DivideBands(RS_MultiplyFactor(sub_greenn_redn_bluen,2),RS_MultiplyFactor(greenn_redn_add_bluen, 2)) as gli\",\n",
    "    \"RS_MultiplyBands(constant_tgi_1,sub_msrg_190_msrb_120)  as tgi\",\n",
    "    \"origin\",\n",
    "    \"image_date\",\n",
    "    \"feature_name\",\n",
    ").cache()\n",
    "calculated.show(5)\n",
    "calculated.printSchema()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "156303b2",
   "metadata": {},
   "outputs": [],
   "source": []
  },
  {
   "cell_type": "code",
   "execution_count": 18,
   "id": "c7664928",
   "metadata": {},
   "outputs": [
    {
     "name": "stderr",
     "output_type": "stream",
     "text": [
      "22/01/06 20:05:11 WARN SQLConf: The SQL config 'spark.sql.execution.arrow.fallback.enabled' has been deprecated in Spark v3.0 and may be removed in the future. Use 'spark.sql.execution.arrow.pyspark.fallback.enabled' instead of it.\n"
     ]
    },
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+-----+---+---+----+---+------+--------------------+-------------------+------------+\n",
      "|gndvi|evi|gci|vari|gli|   tgi|              origin|         image_date|feature_name|\n",
      "+-----+---+---+----+---+------+--------------------+-------------------+------------+\n",
      "|  0.0|0.0|1.0| 0.0|1.0| -64.0|hdfs://776faf4d6a...|2021-12-26 13:42:12|          70|\n",
      "|  0.0|0.0|1.0| 0.0|1.0|-94.53|hdfs://776faf4d6a...|2021-12-21 13:42:05|          70|\n",
      "|  0.0|0.0|1.0| 0.0|1.0|-49.08|hdfs://776faf4d6a...|2021-12-26 13:42:12|           3|\n",
      "|  0.0|0.0|1.0| 0.0|1.0|-50.85|hdfs://776faf4d6a...|2021-12-26 13:42:12|           3|\n",
      "|  0.0|0.0|1.0| 0.0|1.0| -23.8|hdfs://776faf4d6a...|2021-12-16 13:42:11|           3|\n",
      "+-----+---+---+----+---+------+--------------------+-------------------+------------+\n",
      "only showing top 5 rows\n",
      "\n",
      "root\n",
      " |-- gndvi: double (nullable = false)\n",
      " |-- evi: double (nullable = false)\n",
      " |-- gci: double (nullable = false)\n",
      " |-- vari: double (nullable = false)\n",
      " |-- gli: double (nullable = false)\n",
      " |-- tgi: double (nullable = false)\n",
      " |-- origin: string (nullable = true)\n",
      " |-- image_date: timestamp (nullable = true)\n",
      " |-- feature_name: string (nullable = true)\n"
     ]
    }
   ],
   "source": [
    "calculated_mean = calculated.selectExpr(\n",
    "    \"RS_Mean(gndvi) as gndvi\",\n",
    "    \"RS_Mean(evi) as evi\",\n",
    "    \"RS_Mean(gci) as gci\",\n",
    "    \"RS_Mean(vari) as vari\",\n",
    "    \"RS_Mean(gli) as gli\",\n",
    "    \"RS_Mean(tgi) as tgi\",\n",
    "    \"origin\",\n",
    "    \"image_date\",\n",
    "    \"feature_name\",\n",
    ").cache()\n",
    "calculated_mean.show(5)\n",
    "calculated_mean.printSchema()\n",
    "calculated_mean.createOrReplaceTempView(\"all_mean\")"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 35,
   "id": "b3cb84bc",
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "[Row(gndvi=0.0, evi=0.0, gci=1.0, vari=0.0, gli=1.0, tgi=-64.0, origin='hdfs://776faf4d6a1e:8020/sentinel2_tmp/70/20211226T134211_20211226T134212_T21JYM.tif', image_date=datetime.datetime(2021, 12, 26, 13, 42, 12), feature_name='70'), Row(gndvi=0.0, evi=0.0, gci=1.0, vari=0.0, gli=1.0, tgi=-94.53, origin='hdfs://776faf4d6a1e:8020/sentinel2_tmp/70/20211221T134209_20211221T134205_T21JYM.tif', image_date=datetime.datetime(2021, 12, 21, 13, 42, 5), feature_name='70'), Row(gndvi=0.0, evi=0.0, gci=1.0, vari=0.0, gli=1.0, tgi=-49.08, origin='hdfs://776faf4d6a1e:8020/sentinel2_tmp/3/20211226T134211_20211226T134212_T21KYP.tif', image_date=datetime.datetime(2021, 12, 26, 13, 42, 12), feature_name='3'), Row(gndvi=0.0, evi=0.0, gci=1.0, vari=0.0, gli=1.0, tgi=-50.85, origin='hdfs://776faf4d6a1e:8020/sentinel2_tmp/3/20211226T134211_20211226T134212_T21JYN.tif', image_date=datetime.datetime(2021, 12, 26, 13, 42, 12), feature_name='3'), Row(gndvi=0.0, evi=0.0, gci=1.0, vari=0.0, gli=1.0, tgi=-23.8, origin='hdfs://776faf4d6a1e:8020/sentinel2_tmp/3/20211216T134211_20211216T134211_T21KYP.tif', image_date=datetime.datetime(2021, 12, 16, 13, 42, 11), feature_name='3'), Row(gndvi=0.0, evi=0.0, gci=1.0, vari=0.0, gli=1.0, tgi=-24.49, origin='hdfs://776faf4d6a1e:8020/sentinel2_tmp/3/20211216T134211_20211216T134211_T21JYN.tif', image_date=datetime.datetime(2021, 12, 16, 13, 42, 11), feature_name='3'), Row(gndvi=0.0, evi=0.0, gci=1.0, vari=0.0, gli=1.0, tgi=-18.95, origin='hdfs://776faf4d6a1e:8020/sentinel2_tmp/70/20211216T134211_20211216T134211_T21JYM.tif', image_date=datetime.datetime(2021, 12, 16, 13, 42, 11), feature_name='70'), Row(gndvi=0.0, evi=0.0, gci=1.0, vari=0.0, gli=1.0, tgi=-374.07, origin='hdfs://776faf4d6a1e:8020/sentinel2_tmp/3/20211221T134209_20211221T134205_T21JYN.tif', image_date=datetime.datetime(2021, 12, 21, 13, 42, 5), feature_name='3'), Row(gndvi=0.0, evi=0.0, gci=1.0, vari=0.0, gli=1.0, tgi=-375.15, origin='hdfs://776faf4d6a1e:8020/sentinel2_tmp/3/20211221T134209_20211221T134205_T21KYP.tif', image_date=datetime.datetime(2021, 12, 21, 13, 42, 5), feature_name='3'), Row(gndvi=0.0, evi=0.0, gci=1.0, vari=0.0, gli=1.0, tgi=-35.97, origin='hdfs://776faf4d6a1e:8020/sentinel2_tmp/1021/20211226T134211_20211226T134212_T21JYN.tif', image_date=datetime.datetime(2021, 12, 26, 13, 42, 12), feature_name='1021')]\n"
     ]
    }
   ],
   "source": [
    "# POR 1 LINHA SER GRANDE O SUFICIENTE PARA ESTOURO DE MEMORIA O COLLECT NÂO FUNCIONA E NEM SALVAR O DF_SPARK DIRETO\n",
    "# (NECESSÀRIO TRANFORMAR PARA PANDAS LINHA A LINHA)\n",
    "# part_df_export.take(3)\n",
    "part_df_export = calculated_mean.limit(10).collect()\n",
    "print(part_df_export)\n",
    "pd.DataFrame(part_df_export).to_csv(\"teste.csv\", sep=\",\", encoding=\"utf-8\")"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 20,
   "id": "5ae0f827",
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "Garbage collector: collected 199 objects.\n"
     ]
    }
   ],
   "source": [
    "# SAVE COPY TO HDFS\n",
    "# dá o mesmo problema de threadshod unsuficiente que ocorre no fit\n",
    "import gc\n",
    "\n",
    "collected = gc.collect()\n",
    "print(\"Garbage collector: collected %d objects.\" % collected)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 21,
   "id": "aae72e9c",
   "metadata": {},
   "outputs": [],
   "source": [
    "# calculated_mean.repartition(\"origin\").write.format('csv').option('header', True).partitionBy(\"origin\").mode('overwrite').option('sep', ',').save(\"hdfs://776faf4d6a1e:8020/\"+analise_folder)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 22,
   "id": "ed45f9a8",
   "metadata": {},
   "outputs": [],
   "source": [
    "# Random Forest\n",
    "from pyspark.ml import Pipeline\n",
    "from pyspark.ml.classification import RandomForestClassifier\n",
    "from pyspark.ml.linalg import Vectors\n",
    "from pyspark.ml.feature import (\n",
    "    IndexToString,\n",
    "    StringIndexer,\n",
    "    VectorIndexer,\n",
    "    VectorAssembler,\n",
    ")\n",
    "from pyspark.ml.evaluation import MulticlassClassificationEvaluator\n",
    "from pyspark.ml.tuning import ParamGridBuilder\n",
    "import numpy as np\n",
    "from pyspark.ml.tuning import CrossValidator\n",
    "from pyspark.ml.evaluation import RegressionEvaluator\n",
    "from pyspark.ml.feature import OneHotEncoder"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 23,
   "id": "35139bb1",
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "root\n",
      " |-- vari: double (nullable = false)\n",
      "\n",
      "+----+\n",
      "|vari|\n",
      "+----+\n",
      "| 0.0|\n",
      "| 0.0|\n",
      "| 0.0|\n",
      "| 0.0|\n",
      "| 0.0|\n",
      "+----+\n",
      "only showing top 5 rows\n",
      "\n",
      "+----+-----+---+------+---+------+--------------------+\n",
      "|vari|gndvi|evi|   tgi|gli|labels|            features|\n",
      "+----+-----+---+------+---+------+--------------------+\n",
      "| 0.0|  0.0|0.0| -64.0|1.0|    70|(5,[3,4],[-64.0,1...|\n",
      "| 0.0|  0.0|0.0|-94.53|1.0|    70|(5,[3,4],[-94.53,...|\n",
      "| 0.0|  0.0|0.0|-49.08|1.0|     3|(5,[3,4],[-49.08,...|\n",
      "| 0.0|  0.0|0.0|-50.85|1.0|     3|(5,[3,4],[-50.85,...|\n",
      "| 0.0|  0.0|0.0| -23.8|1.0|     3|(5,[3,4],[-23.8,1...|\n",
      "+----+-----+---+------+---+------+--------------------+\n",
      "only showing top 5 rows\n"
     ]
    }
   ],
   "source": [
    "vari = calculated_mean.select(\"vari\")\n",
    "vari.printSchema()\n",
    "vari.show(5)\n",
    "df_rf_assembler = calculated_mean.selectExpr(\n",
    "    \"vari\", \"gndvi\", \"evi\", \"tgi\", \"gli\", \"cast(feature_name as long) as labels\"\n",
    ")\n",
    "# FORMATO NECESSARIO PARA O FIT\n",
    "feature_list = [col for col in df_rf_assembler.columns if col != \"labels\"]\n",
    "assembler = VectorAssembler(inputCols=feature_list, outputCol=\"features\")\n",
    "# rf = RandomForestClassifier(labelCol=\"labels\", featuresCol=\"features\")\n",
    "df_rf_assembler = assembler.transform(df_rf_assembler)\n",
    "df_rf_assembler.show(5)\n",
    "# (trainingData, testData) = df_rf_assembler.randomSplit([0.8, 0.2])\n",
    "# trainingData.show(5)\n",
    "# testData.show(5)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 24,
   "id": "f5534a95",
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "True"
      ]
     },
     "execution_count": 24,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "hdfs.delete_file_dir(\"teste\", recursive=True)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 25,
   "id": "9f878429",
   "metadata": {},
   "outputs": [],
   "source": [
    "import numpy\n",
    "from numpy import allclose\n",
    "from pyspark.ml.linalg import Vectors\n",
    "from pyspark.ml.feature import StringIndexer\n",
    "from pyspark.ml.classification import (\n",
    "    RandomForestClassifier,\n",
    "    RandomForestClassificationModel,\n",
    ")\n",
    "\n",
    "# df = spark.createDataFrame([\n",
    "#     (1.0, Vectors.dense(1.0)),\n",
    "#     (0.0, Vectors.sparse(1, [], []))], [\"label\", \"features\"])\n",
    "\n",
    "# stringIndexer = StringIndexer(inputCol=\"labels\", outputCol=\"indexed\")\n",
    "# si_model = stringIndexer.fit(df)\n",
    "# td = si_model.transform(df)\n",
    "# rf = RandomForestClassifier(numTrees=3, maxDepth=2, labelCol=\"indexed\", seed=42,\n",
    "#     leafCol=\"leafId\")\n",
    "# rf.getMinWeightFractionPerNode()\n",
    "\n",
    "# model = rf.fit(td)\n",
    "# model.getLabelCol()\n",
    "\n",
    "# model.setFeaturesCol(\"features\")\n",
    "\n",
    "# model.setRawPredictionCol(\"newRawPrediction\")\n",
    "\n",
    "# model.getBootstrap()\n",
    "\n",
    "# model.getRawPredictionCol()\n",
    "\n",
    "# model.featureImportances\n",
    "\n",
    "# allclose(model.treeWeights, [1.0, 1.0, 1.0])\n",
    "\n",
    "# test0 = spark.createDataFrame([(Vectors.dense(-1.0),)], [\"features\"])\n",
    "# model.predict(test0.head().features)\n",
    "\n",
    "# model.predictRaw(test0.head().features)\n",
    "\n",
    "# model.predictProbability(test0.head().features)\n",
    "\n",
    "# result = model.transform(test0).head()\n",
    "# result.prediction\n",
    "\n",
    "# numpy.argmax(result.probability)\n",
    "\n",
    "# numpy.argmax(result.newRawPrediction)\n",
    "\n",
    "# result.leafId\n",
    "\n",
    "# test1 = spark.createDataFrame([(Vectors.sparse(1, [0], [1.0]),)], [\"features\"])\n",
    "# model.transform(test1).head().prediction\n",
    "\n",
    "# model.trees\n",
    "# temp_path= 'hdfs://776faf4d6a1e:8020/teste'\n",
    "# rfc_path = temp_path + \"/rfc\"\n",
    "# rf.save(rfc_path)\n",
    "# rf2 = RandomForestClassifier.load(rfc_path)\n",
    "# rf2.getNumTrees()\n",
    "\n",
    "# model_path = temp_path + \"/rfc_model\"\n",
    "# model.save(model_path)\n",
    "# model2 = RandomForestClassificationModel.load(model_path)\n",
    "# model.featureImportances == model2.featureImportances\n",
    "\n",
    "# model.transform(test0).take(1) == model2.transform(test0).take(1)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "cd579ea4",
   "metadata": {},
   "outputs": [],
   "source": []
  },
  {
   "cell_type": "code",
   "execution_count": 26,
   "id": "380226cf",
   "metadata": {},
   "outputs": [],
   "source": [
    "# spark.stop()"
   ]
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "Python 3 (ipykernel)",
   "language": "python",
   "name": "python3"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.7.12"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 5
}
